package priv.pfz.paxos.role;

import com.google.common.collect.Lists;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import priv.pfz.paxos.common.SleepUtil;
import priv.pfz.paxos.network.RpcManager;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * @author pengfangzhou
 * @date 2022/2/23 22:02
 */
@Slf4j
public class PaxosCluster {
    @Setter
    private int proposerNum = 3;
    @Setter
    private int acceptorNum = 5;
    @Setter
    private int minLatency = 10;
    @Setter
    private int maxLatency = 1000;
    @Setter
    private int lostRate = 15;

    public void run() {
        //创建网络环境
        RpcManager rpcManager = new RpcManager();
        rpcManager.setMinLatency(minLatency);
        rpcManager.setMaxLatency(maxLatency);
        rpcManager.setLostRate(lostRate);

        //创建acceptor
        AcceptorInfo acceptorInfo = new AcceptorInfo();
        for (int i = 1; i <= acceptorNum; i++) {
            String acceptorId = "A" + i;
            acceptorInfo.getAcceptorMap().put(acceptorId, new Acceptor(acceptorId));
            log.info("acceptor:{}", acceptorId);
        }
        acceptorInfo.setTotal(acceptorNum);
        acceptorInfo.setMajority(acceptorNum / 2 + 1);

        //创建proposer
        List<Proposer> proposers = Lists.newArrayList();
        for (int i = 1; i <= proposerNum; i++) {
            String proposerId = "P" + i;
            String proposalValue = RandomStringUtils.randomAlphabetic(1);
            Proposer proposer = new Proposer(proposerId, proposalValue, acceptorInfo, rpcManager);
            proposers.add(proposer);
            log.info("proposer:{}, proposalValue:{}", proposerId, proposalValue);
        }

        //启动
        for (Proposer proposer : proposers) {
            proposer.doPreparePhase();
        }

        //轮询是否结束
        while (true) {
            SleepUtil.sleep(1000);
            boolean allFinished = proposers.stream().allMatch(Proposer::isFinish);
            if (allFinished) {
                Set<String> values = proposers.stream().map(Proposer::getProposalValue).collect(Collectors.toSet());
                if (values.size() == 1) {
                    log.info("达成一致");
                    return;
                } else {
                    throw new RuntimeException("未达成一致");
                }
            }
        }
    }
}
